package com.yifeng.repo.toolkit.elasticsearch;

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;

import java.util.List;
import java.util.Map;

/**
 * Created by daibing on 2021/5/18.
 */
@Slf4j
public class ElasticSearchDataWorker {
    private final RestHighLevelClient esClient;

    public ElasticSearchDataWorker(ElasticSearchClient client) {
        // 默认仅支持单集群
        this.esClient = client.getClient();
    }

    public int bulkInsert(String indexName, List<Map<String, Object>> insertDataList) {
        return this.bulkInsert(indexName, insertDataList, null, false);
    }

    public int bulkInsert(String indexName, List<Map<String, Object>> insertDataList, String dataKey) {
        return this.bulkInsert(indexName, insertDataList, dataKey, false);
    }

    public int bulkInsert(String indexName, List<Map<String, Object>> insertDataList, String dataKey, boolean sync) {
        if (insertDataList.isEmpty()) {
            return 0;
        }
        BulkRequest request = new BulkRequest();
        for (Map<String, Object> data : insertDataList) {
            if (!Strings.isNullOrEmpty(dataKey) && data.containsKey(dataKey)) {
                String key = data.get(dataKey).toString();
                request.add(new IndexRequest(indexName).id(key).source(data));
            } else {
                request.add(new IndexRequest(indexName).source(data));
            }
        }
        if (sync) {
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            request.waitForActiveShards(ActiveShardCount.ONE);
        } else {
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
        }
        request.timeout(TimeValue.timeValueMinutes(2));
        try {
            BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
            int count = 0;
            if (response.hasFailures()) {
                for (BulkItemResponse item : response.getItems()) {
                    if (item.isFailed()) {
                        log.warn("bulk insert data failed: id={}, error={}", item.getId(), item.getFailureMessage());
                        continue;
                    }
                    count++;
                }
            } else {
                count = response.getItems().length;
            }
            log.info("bulk insert data: indexName={}, request.size={}, success.count={}, took={} ms",
                    indexName, request.requests().size(), count, response.getTook().millis());
            return count;
        } catch (Throwable t) {
            log.warn("bulk insert data failed: indexName={}, error=", indexName, t);
            throw new RuntimeException("bulk insert data " + indexName + " failed, error: ", t);
        }
    }

    public int bulkInsertOrUpdate(String indexName, List<Map<String, Object>> updateDataList, String dataKey, DataUpdateHandler handler) {
        return this.bulkInsertOrUpdate(indexName, updateDataList, dataKey, handler, false);
    }

    public int bulkInsertOrUpdate(String indexName, List<Map<String, Object>> insertDataList, String dataKey, DataUpdateHandler handler, boolean sync) {
        if (insertDataList.isEmpty()) {
            return 0;
        }
        BulkRequest request = new BulkRequest();
        for (Map<String, Object> insertData : insertDataList) {
            if (Strings.isNullOrEmpty(dataKey) || !insertData.containsKey(dataKey)) {
                continue;
            }
            String key = insertData.get(dataKey).toString();
            Map<String, Object> updateData = handler.buildUpdateData(insertData);
            List<String> incrementFileds = handler.listIncrementField();
            if (incrementFileds == null || incrementFileds.isEmpty()) {
                request.add(new UpdateRequest(indexName, key).upsert(insertData).doc(updateData));
            } else {
                StringBuilder sb = new StringBuilder();
                for (String filed : updateData.keySet()) {
                    if (incrementFileds.contains(filed)) {
                        sb.append("ctx._source.").append(filed).append("+=").append("params.").append(filed).append(";");
                    } else {
                        sb.append("ctx._source.").append(filed).append("=").append("params.").append(filed).append(";");
                    }
                }
                Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, sb.toString(), insertData);
                request.add(new UpdateRequest(indexName, key).upsert(insertData).script(script));
            }
        }
        if (sync) {
            request.timeout(TimeValue.timeValueMinutes(2));
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            request.waitForActiveShards(ActiveShardCount.ONE);
        } else {
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
        }
        try {
            BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
            int count = 0;
            if (response.hasFailures()) {
                for (BulkItemResponse item : response.getItems()) {
                    if (item.isFailed()) {
                        log.warn("bulk insert or update data failed: id={}, error={}", item.getId(), item.getFailureMessage());
                        continue;
                    }
                    count++;
                }
            } else {
                count = response.getItems().length;
            }
            log.info("bulk insert or update data: indexName={}, request.size={}, success.count={}, took={} ms",
                    indexName, request.requests().size(), count, response.getTook().millis());
            return count;
        } catch (Throwable t) {
            log.warn("bulk insert or update data failed: indexName={}, error=", indexName, t);
            throw new RuntimeException("bulk insert or update data " + indexName + " failed, error: ", t);
        }
    }

    public long bulkUpdate(String indexName, QueryBuilder condition, int waveCount, Script script) {
        // 1. 准备查询: 查询条件、版本冲突后继续执行
        UpdateByQueryRequest request = new UpdateByQueryRequest(indexName)
                .setQuery(condition)
                .setScript(script)
                .setBatchSize(waveCount)
                .setScroll(TimeValue.timeValueMinutes(10))
                .setTimeout(TimeValue.timeValueMinutes(2))
                .setAbortOnVersionConflict(false)
                .setRefresh(true);

        // 2. 执行查询：解析结果
        try {
            BulkByScrollResponse response = esClient.updateByQuery(request, RequestOptions.DEFAULT);
            log.info("bulk update data: indexName={}, totalHits={}, delete.update={}, took={} ms",
                    indexName, response.getTotal(), response.getStatus().getUpdated(), response.getTook().millis());
            return response.getStatus().getUpdated();
        } catch (Throwable t) {
            log.warn("bulk update data failed: indexName={}, error=", indexName, t);
            throw new RuntimeException("bulk update data " + indexName + " failed, error: ", t);
        }
    }

    public int delete(String indexName, String id) {
        DeleteRequest request = new DeleteRequest(indexName, id);
        try {
            esClient.delete(request, RequestOptions.DEFAULT);
            return 1;
        } catch (Throwable t) {
            log.warn("delete failed: indexName={}, id={}, error=", indexName, id, t);
            throw new RuntimeException(String.format("delete failed: indexName=%s, id=%s, error=%s", indexName, id, t));
        }
    }

    public long bulkDelete(String indexName, QueryBuilder condition, int waveCount) {
        // 1. 准备查询: 查询条件、版本冲突后继续执行
        DeleteByQueryRequest request = new DeleteByQueryRequest(indexName)
                .setQuery(condition)
                .setBatchSize(waveCount)
                .setScroll(TimeValue.timeValueMinutes(10))
                .setTimeout(TimeValue.timeValueMinutes(2))
                .setAbortOnVersionConflict(false)
                .setRefresh(true);

        // 2. 执行查询：解析结果
        try {
            BulkByScrollResponse response = esClient.deleteByQuery(request, RequestOptions.DEFAULT);
            log.info("bulk delete data: indexName={}, totalHits={}, delete.count={}, took={} ms",
                    indexName, response.getTotal(), response.getStatus().getDeleted(), response.getTook().millis());
            return response.getStatus().getDeleted();
        } catch (Throwable t) {
            log.warn("bulk delete data failed: indexName={}, error=", indexName, t);
            throw new RuntimeException("bulk delete data " + indexName + " failed, error: ", t);
        }
    }

    public interface DataUpdateHandler {
        Map<String, Object> buildUpdateData(Map<String, Object> map);
        List<String> listIncrementField();
    }
}
